DagsterをKubernetes環境で構築してみた
個人的に気になっていたデータオーケストレーションツールのDagster。
このブログでは、以下の2つを実施していきます。
- サンプルコードのまま、DagsterをKubernetes環境で構築する
- 独自のパイプラインを定義したイメージをデプロイし、Dagsterの環境を更新する
準備
DagsterのKubernetesデプロイには、パッケージマネージャーのHelmを使用しています。
この公式ドキュメントの中にインストール方法についてのREADMEがあったので、こちらを参考に構築を進めていきます。
- 環境
- macOS Big Sur 11.5.1
- Docker Desktop v3.5.2
- Kubernetes on Docker Desktop v1.21.2
Helmの環境がなかったため、Homebrewでインストールしておきます。
$ brew install helm $ helm version version.BuildInfo{Version:"v3.6.3", GitCommit:"d506314abfb5d21419df8c7e7e68012379db2354", GitTreeState:"dirty", GoVersion:"go1.16.5"}
手順
Helmを使えば、GitHubに上がっているソースからそのままインストールできるみたいです。やってみます。
$ helm repo add dagster https://dagster-io.github.io/helm "dagster" has been added to your repositories $ helm install dagster-test dagster/dagster \ --namespace dagster \ --create-namespace NAME: dagster-test LAST DEPLOYED: Thu Sep 9 14:46:34 2021 NAMESPACE: dagster STATUS: deployed REVISION: 1 NOTES: Launched. You can access Dagit by running the following commands: export DAGIT_POD_NAME=$(kubectl get pods --namespace dagster -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster-test,component=dagit" -o jsonpath="{.items[0].metadata.name}") echo "Visit http://127.0.0.1:8080 to open Dagit" kubectl --namespace dagster port-forward $DAGIT_POD_NAME 8080:80
Launched. You can access Dagit by running the following commands:
とあるので、実行してhttp://127.0.0.1:8080にアクセスします。
$ export DAGIT_POD_NAME=$(kubectl get pods --namespace dagster -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster-test,component=dagit" -o jsonpath="{.items[0].metadata.name}") $ echo "Visit http://127.0.0.1:8080 to open Dagit" $ kubectl --namespace dagster port-forward $DAGIT_POD_NAME 8080:80
立ち上がりました!コンテナ環境でも爆速でデプロイできました。
Dagsterのチュートリアルが気になる方は、こちらの記事をご参照ください。
コンポーネントの構成
先のデプロイで使用した各種パラメータは、values.yamlというHelmで使用するYAMLに記載されています。このパラメータを調節することで、導入したい環境に合わせたDagsterを構築することができます。下図のコンポーネントアーキテクチャ別に分類すると、以下のように分けられそうです。
※出典: Deploying with Helm | Dagster
Daemon
- Dagster Daemon
- キューに実行を作成したり、スケジュールやセンサーを稼働させるデーモン?
- Scheduler
- ジョブのスケジューラー
- Run Launcher
- 実行状況を制御し、Run Workerのジョブを作成する
Dagit
- Dagit
- User Code DeploymentのgRPCサーバーとやり取りするWebサーバー
Database
- PostgreSQL
- 実行状態や履歴などを保管するDB
Run Worker
- Pipeline Run
- ジョブの実行を担う?
User Code Deployment
- User Code Deployments
- DagitやDagsterがリポジトリ情報や現在のイメージ情報を取得するためのgRPCサーバーを構築する
- ここにユーザーが定義するパイプラインやジョブが含まれている
その他
- Compute Log Manager
- Dagsterは処理中の中間データ・ファイルをS3等に配置できるが、それを管理する機能?
- 先のデプロイでは使用されていない
- RabbitMQ
- Celeryのバックエンドとなるメッセージブローカー
- Redis
- Celeryのバックエンドとなるメッセージブローカー
- Flower
- Celery用のWebインターフェイス
- Ingress
- DagitやFlower用のIngress
- busybox
- 接続状態の監視を行うbusybox
上の概観的に、ユーザーはUser Code Deploymentに対して、ジョブやパイプラインの定義を更新してい流れになりそう。
というわけで、話はBuild Docker image for User Code | Dagsterに繋がってきます。
独自のパイプラインのデプロイ
パイプラインデプロイの前に、kubectlにconfigを設定します。
$ kubectl config set-context dagster --namespace default --cluster docker-desktop --user=docker-desktop Context "dagster" created. $ kubectl config get-contexts CURRENT NAME CLUSTER AUTHINFO NAMESPACE dagster docker-desktop docker-desktop default * docker-desktop docker-desktop docker-desktop $ kubectl config use-context dagster Switched to context "dagster". $ kubectl config current-context dagster
自身で実装したパイプラインをデプロイするには、まずそのパイプラインコードが含まれているDagsterリポジトリのDockerイメージをビルドする必要があります。
サンプルで用意されているDockerfileはリポジトリの以下の位置にいます。
k8s-example/ ├ build_cache/ │ └ example_project/ │ ├ example_repo/ | | └ repo.py │ ├ run_config/ | | ├ celery_k8s.yaml | | ├ celery_k8s_grpc.yaml | | └ pipeline.yaml │ └ workspace.yaml ├ Dockerfile <- ココ ├ last_updated.yaml └ versions.yaml
このDockerfile
ではPythonライブラリのインストールや、build_cache
ディレクトリをコピーする処理を行なっています。
ARG BASE_IMAGE FROM "${BASE_IMAGE}" ARG DAGSTER_VERSION # ==> Add Dagster layer RUN \ # Cron apt-get update -yqq \ && apt-get install -yqq cron \ # Dagster && pip install \ dagster==${DAGSTER_VERSION} \ dagster-postgres==${DAGSTER_VERSION} \ dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \ dagster-aws==${DAGSTER_VERSION} \ dagster-k8s==${DAGSTER_VERSION} \ dagster-celery-k8s==${DAGSTER_VERSION} \ # Cleanup && rm -rf /var \ && rm -rf /root/.cache \ && rm -rf /usr/lib/python2.7 \ && rm -rf /usr/lib/x86_64-linux-gnu/guile # ==> Add user code layer # Example pipelines COPY build_cache/ /
要は build_cache
配下にパイプラインのスクリプトを構築しておけば、パイプラインのイメージが作成できます。今回はrepo.py
の中身をA More Complex DAGの内容で上書きしてビルドします。
import csv import requests from dagster import pipeline, solid @solid def download_cereals(): response = requests.get("https://docs.dagster.io/assets/cereal.csv") lines = response.text.split("\n") return [row for row in csv.DictReader(lines)] @solid def find_highest_calorie_cereal(cereals): sorted_cereals = list( sorted(cereals, key=lambda cereal: cereal["calories"]) ) return sorted_cereals[-1]["name"] @solid def find_highest_protein_cereal(cereals): sorted_cereals = list( sorted(cereals, key=lambda cereal: cereal["protein"]) ) return sorted_cereals[-1]["name"] @solid def display_results(context, most_calories, most_protein): context.log.info(f"Most caloric cereal: {most_calories}") context.log.info(f"Most protein-rich cereal: {most_protein}") @pipeline def complex_pipeline(): cereals = download_cereals() display_results( most_calories=find_highest_calorie_cereal(cereals), most_protein=find_highest_protein_cereal(cereals), )
Dockerイメージのビルド先ですが、今回はDocker HubにPublicで上げて進めていきます。Docker Hubにログイン後、Create Repository
をクリックします。
Public
を選択し、ポジトリ名をdagster-k8s-demo
として作成します。
BASE_IMAGE
とDAGSTER_VERSION
を引数に渡しながらビルドします。pushすればDockerHubにイメージが上がっているはずです。
$ docker build -t tharuta/dagster-user-code:0.1 . --build-arg BASE_IMAGE=python:3.7.8-slim --build-arg DAGSTER_VERSION=0.12.9 $ docker push tharuta/dagster-user-code:0.1 Using default tag: latest The push refers to repository [docker.io/tharuta/dagster-user-code] d8c09038a574: Pushed c0eae5bac8b2: Pushed 880d8e38a8e4: Mounted from dagster/dagster-celery-k8s 23746fb81c22: Mounted from dagster/dagster-celery-k8s 2de5ba74fd9a: Mounted from dagster/dagster-celery-k8s 2b99e2403063: Mounted from dagster/dagster-celery-k8s d0f104dc0a1f: Mounted from dagster/dagster-celery-k8s latest: digest: sha256:8ee1e4a2d8292e41e78e5f5610bea9de4346df3b023afb20544837f2f31fb832 size: 1789
続いてhelmの設定ファイルの user-deployments
の箇所を、先ほどのDockerイメージで上書きします。現在のvalues.yaml
の内容を以下のコマンドで出力させます。
$ helm show values dagster/dagster > values.yaml
values.yaml
の内、以下のrepository
とtag
を書き換えます。
# values.yaml dagster-user-deployments: # Creates a workspace file with the gRPC servers hosting your user code. enabled: true # If you plan on deploying user code in a separate Helm release, set this to false. enableSubchart: true # List of unique deployments deployments: - name: "k8s-example-user-code-1" image: # When a tag is not supplied, it will default as the Helm chart version. repository: "docker.io/tharuta/dagster-user-code" tag: latest
values.yaml
を使用して、helmでpodsをアップデートします。podsが正常に稼働していることを確認したら、環境変数とポートフォワードを再起動します。
$ helm upgrade --install dagster dagster/dagster -f values.yaml Release "dagster" has been upgraded. Happy Helming! NAME: dagster LAST DEPLOYED: Thu Sep 9 15:43:10 2021 NAMESPACE: default STATUS: deployed REVISION: 4 NOTES: Launched. You can access Dagit by running the following commands: export DAGIT_POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" -o jsonpath="{.items[0].metadata.name}") echo "Visit http://127.0.0.1:8080 to open Dagit" kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80 $ kubectl get pods NAME READY STATUS RESTARTS AGE dagster-daemon-5dbc58d6fc-6k76j 1/1 Running 0 29m dagster-dagit-5dc77c6f98-s5wps 1/1 Running 0 7m39s dagster-dagster-user-deployments-k8s-example-user-code-1-b2ps7s 1/1 Running 0 7m39s dagster-postgresql-0 1/1 Running 0 29m $ export DAGIT_POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" -o jsonpath="{.items[0].metadata.name}") $ kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80
無事意図したパイプラインに更新できました!
この仕組みによって、様々なユーザーがそれぞれパイプラインを記述しても、DagsterのUser Code Deploymentの部分だけ更新すればいい点で、サービス全体を疎結合に保つことができます。これは便利!
次のステップ
次回はこの辺りをやってみようかなと考えています。
- dbtとインテグレーションしてみる
- 本番のワークロードを踏まえた上で、AWSのEKSにデプロイ
- 本番のワークロードを踏まえた上で、GCPのGKEでデプロイ